Bottleneck simulation#
In this demonstration, we’ll construct a basic bottleneck situation and simulate the evacuation of agents.
Introduction[CB]
With this simulation, we want to show the effects of a bottleneck on a crowd. The goal is to understand what problems could emerge from such a geometry - or what benefits could result from it. In order to quantify the results, it is possible to measure the density at the entrance of the bottleneck.
The observations and the results will of course depend on the parameters you choose to enter.
Let’s begin by importing the required packages for our simulation:
Show code cell source
from shapely import GeometryCollection, Polygon, to_wkt, unary_union
import pathlib
import jupedsim as jps
from jupedsim.distributions import distribute_by_number
import sqlite3 # parse trajectory db
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.graph_objs import Figure
import pedpy # analysis
%matplotlib inline
Setting up a geometry#
We will be using the a bottleneck setup as used in the experiments published by [Adrian2018].
complete_area = [
(3.5, -2),
(3.5, 8),
(-3.5, 8),
(-3.5, -2),
]
obstacles = [
# left barrier
[
(-0.7, -1.1),
(-0.25, -1.1),
(-0.25, -0.15),
(-0.4, 0.0),
(-2.8, 0.0),
(-2.8, 6.7),
(-3.05, 6.7),
(-3.05, -0.3),
(-0.7, -0.3),
(-0.7, -1.0),
],
# right barrier
[
(0.25, -1.1),
(0.7, -1.1),
(0.7, -0.3),
(3.05, -0.3),
(3.05, 6.7),
(2.8, 6.7),
(2.8, 0.0),
(0.4, 0.0),
(0.25, -0.15),
(0.25, -1.1),
],
]
walkable_area = pedpy.WalkableArea(complete_area, obstacles)
Show code cell source
pedpy.plot_walkable_area(walkable_area=walkable_area);
Operational model#
Once the geometry is set, our subsequent task is to specify the model and its associated parameters. For this demonstration, we’ll employ the “collision-free” model.
Setting Up the Simulation Object#
Having established the model and geometry details, and combined with other parameters such as the time step \(dt\), we can proceed to construct our simulation object as illustrated below:
trajectory_file = "trajectories_bottlenck.sqlite"
simulation = jps.Simulation(
model=jps.CollisionFreeSpeedModel(),
geometry=walkable_area.polygon,
trajectory_writer=jps.SqliteTrajectoryWriter(
output_file=pathlib.Path(trajectory_file)
),
)
Specifying Routing Details#
At this juncture, we’ll provide basic routing instructions, guiding the agents to progress towards an exit point.
exit_polygon = [(-0.2, -2), (0.2, -2), (0.2, -1.7), (-0.2, -1.7)]
exit_id = simulation.add_exit_stage(exit_polygon)
journey = jps.JourneyDescription([exit_id])
journey_id = simulation.add_journey(journey)
Defining and Distributing Agents#
Now, we’ll position the agents and establish their attributes, leveraging previously mentioned parameters such as exit_id and profile_id.
total_agents = 35
positions = distribute_by_number(
polygon=Polygon([[2.5, 6.5], [-2.5, 6.5], [-2.5, 4], [2.5, 4]]),
number_of_agents=total_agents,
distance_to_agents=0.4,
distance_to_polygon=0.4,
seed=45131502,
)
for position in positions:
simulation.add_agent(
jps.CollisionFreeSpeedModelAgentParameters(
journey_id=journey_id, stage_id=exit_id, position=position
)
)
Executing the Simulation#
With all components in place, we’re set to initiate the simulation. For this demonstration, the trajectories will be recorded in an sqlite database.
while simulation.agent_count() > 0 and simulation.elapsed_time() < 180:
simulation.iterate()
Visualizing the Trajectories#
For trajectory visualization, we’ll extract data from the sqlite database. A straightforward method for this is employing the jupedsim-visualizer.
Show code cell source
def read_sqlite_file(trajectory_file: str) -> pedpy.TrajectoryData:
with sqlite3.connect(trajectory_file) as con:
data = pd.read_sql_query(
"select frame, id, pos_x as x, pos_y as y, ori_x as ox, ori_y as oy from trajectory_data",
con,
)
fps = float(
con.cursor()
.execute("select value from metadata where key = 'fps'")
.fetchone()[0]
)
walkable_area = (
con.cursor().execute("select wkt from geometry").fetchone()[0]
)
return (
pedpy.TrajectoryData(data=data, frame_rate=fps),
pedpy.WalkableArea(walkable_area),
)
def speed_to_color(speed, min_speed, max_speed, midpoint):
colorscale = px.colors.diverging.RdBu_r[::-1]
# Normalize speed based on the midpoint
if speed >= midpoint:
normalized_speed = 0.5 + 0.5 * (speed - midpoint) / (
max_speed - midpoint
)
else:
normalized_speed = 0.5 * (speed - min_speed) / (midpoint - min_speed)
# Clip to ensure the value is between 0 and 1
normalized_speed = np.clip(normalized_speed, 0, 1)
# Find the corresponding color in the colorscale
color_idx = int(normalized_speed * (len(colorscale) - 1))
return colorscale[color_idx]
def get_geometry_traces(area):
geometry_traces = []
x, y = area.exterior.xy
geometry_traces.append(
go.Scatter(
x=np.array(x),
y=np.array(y),
mode="lines",
line={"color": "grey"},
showlegend=False,
name="Exterior",
hoverinfo="name",
)
)
for inner in area.interiors:
xi, yi = zip(*inner.coords[:])
geometry_traces.append(
go.Scatter(
x=np.array(xi),
y=np.array(yi),
mode="lines",
line={"color": "grey"},
showlegend=False,
name="Obstacle",
hoverinfo="name",
)
)
return geometry_traces
def get_shapes_for_frame(frame_data, min_speed, max_speed, midpoint):
def create_shape(row):
hover_trace = go.Scatter(
x=[row["x"]],
y=[row["y"]],
text=[f"ID: {row['id']}, Pos({row['x']:.2f},{row['y']:.2f})"],
mode="markers",
marker=dict(size=1, opacity=1),
hoverinfo="text",
showlegend=False,
)
if row["speed"] == -1000: # Check for dummy speed
return (
go.layout.Shape(
type="circle",
xref="x",
yref="y",
x0=row["x"] - row["radius"],
y0=row["y"] - row["radius"],
x1=row["x"] + row["radius"],
y1=row["y"] + row["radius"],
line=dict(width=0),
fillcolor="rgba(255,255,255,0)", # Transparent fill
),
hover_trace,
)
color = speed_to_color(row["speed"], min_speed, max_speed, midpoint)
return (
go.layout.Shape(
type="circle",
xref="x",
yref="y",
x0=row["x"] - row["radius"],
y0=row["y"] - row["radius"],
x1=row["x"] + row["radius"],
y1=row["y"] + row["radius"],
line_color=color,
fillcolor=color,
),
hover_trace,
)
results = frame_data.apply(create_shape, axis=1).tolist()
shapes = [res[0] for res in results]
hover_traces = [res[1] for res in results]
return shapes, hover_traces
def create_fig(
initial_agent_count,
initial_shapes,
initial_hover_trace,
geometry_traces,
hover_traces,
frames,
steps,
area_bounds,
width=800,
height=800,
):
minx, miny, maxx, maxy = area_bounds
fig = go.Figure(
data=geometry_traces + hover_traces + initial_hover_trace,
frames=frames,
layout=go.Layout(
shapes=initial_shapes,
title=f"<b>Number of Agents: {initial_agent_count}</b>",
title_x=0.5,
),
)
fig.update_layout(
updatemenus=[
{
"buttons": [
{
"args": [
None,
{
"frame": {"duration": 100, "redraw": True},
"fromcurrent": True,
},
],
"label": "Play",
"method": "animate",
}
],
"direction": "left",
"pad": {"r": 10, "t": 87},
"showactive": False,
"type": "buttons",
"x": 0.1,
"xanchor": "right",
"y": 0,
"yanchor": "top",
}
],
sliders=[
{
"active": 0,
"yanchor": "top",
"xanchor": "left",
"currentvalue": {
"font": {"size": 20},
"prefix": "Frame:",
"visible": True,
"xanchor": "right",
},
"transition": {"duration": 100, "easing": "cubic-in-out"},
"pad": {"b": 10, "t": 50},
"len": 0.9,
"x": 0.1,
"y": 0,
"steps": steps,
}
],
autosize=False,
width=width,
height=height,
xaxis=dict(range=[minx - 0.5, maxx + 0.5]),
yaxis=dict(
scaleanchor="x", scaleratio=1, range=[miny - 0.5, maxy + 0.5]
),
)
return fig
def animate(
data: pedpy.TrajectoryData, area: pedpy.WalkableArea, *, every_nth_frame=5
):
data_df = pedpy.compute_individual_speed(traj_data=data, frame_step=5)
data_df = data_df.merge(data.data, on=["id", "frame"], how="left")
data_df["radius"] = 0.2
min_speed = data_df["speed"].min()
max_speed = data_df["speed"].max()
midpoint = np.mean(data_df["speed"])
max_agents = data_df.groupby("frame").size().max()
dummy_agent_data = {"x": 0, "y": 0, "radius": 0, "speed": -1000}
frames = []
steps = []
unique_frames = data_df["frame"].unique()
selected_frames = unique_frames[::every_nth_frame]
geometry_traces = get_geometry_traces(area.polygon)
initial_frame_data = data_df[data_df["frame"] == data_df["frame"].min()]
initial_agent_count = len(initial_frame_data)
initial_shapes, initial_hovers = get_shapes_for_frame(
initial_frame_data, min_speed, max_speed, midpoint
)
for frame_num in selected_frames[1:]:
frame_data = data_df[data_df["frame"] == frame_num]
agent_count = len(frame_data)
while len(frame_data) < max_agents:
dummy_df = pd.DataFrame([dummy_agent_data])
frame_data = pd.concat([frame_data, dummy_df], ignore_index=True)
shapes, hover_traces = get_shapes_for_frame(
frame_data, min_speed, max_speed, midpoint
)
frame = go.Frame(
data=geometry_traces + hover_traces,
name=str(frame_num),
layout=go.Layout(
shapes=shapes,
title=f"<b>Number of Agents: {agent_count}</b>",
title_x=0.5,
),
)
frames.append(frame)
step = {
"args": [
[str(frame_num)],
{
"frame": {"duration": 100, "redraw": True},
"mode": "immediate",
"transition": {"duration": 500},
},
],
"label": str(frame_num),
"method": "animate",
}
steps.append(step)
return create_fig(
initial_agent_count,
initial_shapes,
initial_hovers,
geometry_traces,
hover_traces,
frames,
steps,
area.bounds,
width=800,
height=800,
)
trajectory_data, walkable_area = read_sqlite_file(trajectory_file)
animate(trajectory_data, walkable_area)
Measurement of the N-T curve and Voronoi density#
We evaluate the \(N−t\) curve at the bottleneck’s exit. The gradient of this curve provides insights into the flow rate through the bottleneck. Subsequently, we assess the Voronoi density preceding the bottleneck.
To achieve this, we deifne a measurement line, delineated by two points representing the bottleneck’s entrance. Additionally, we designate a square area to gauge the density directly in front of this entrance.
import warnings
warnings.filterwarnings("ignore")
measurement_area = pedpy.MeasurementArea(
[(-0.4, 0.5), (0.4, 0.5), (0.4, 1.3), (-0.4, 1.3)]
)
measurement_line = pedpy.MeasurementLine([(0.25, 0), (-0.25, 0)])
nt, crossing_frames = pedpy.compute_n_t(
traj_data=trajectory_data,
measurement_line=measurement_line,
)
pedpy.plot_measurement_setup(
walkable_area=walkable_area,
hole_color="lightgrey",
traj=trajectory_data,
traj_color="lightblue",
traj_alpha=0.5,
traj_width=1,
measurement_lines=[measurement_line],
measurement_areas=[measurement_area],
ml_color="b",
ma_color="r",
ma_line_color="r",
ma_line_width=1,
ma_alpha=0.2,
).set_aspect("equal")
\(N-t\) curve#
pedpy.plot_nt(nt=nt)
print(f"Evacuation time: {max(nt.time)} seconds")
Evacuation time: 44.0 seconds
Voronoi density#
individual = pedpy.compute_individual_voronoi_polygons(
traj_data=trajectory_data, walkable_area=walkable_area
)
density_voronoi, intersecting = pedpy.compute_voronoi_density(
individual_voronoi_data=individual, measurement_area=measurement_area
)
classic_density = pedpy.compute_classic_density(
traj_data=trajectory_data, measurement_area=measurement_area
)
pedpy.plot_density(density=density_voronoi, color="red")
pedpy.plot_density(
density=classic_density,
title="Voronoi density (red) vs Classic density (blue)",
);
At the entrance of the bottleneck, the density rises. And with the density rising, the risk of
References & Further Exploration#
The operational model discussed in the Model section is based on the collision-free model. JuPedSim also incorporates another model known as GCFM. For more details on GCFM, refer to another notebook (TODO: Link to the GCFM notebook).
Our demonstration employed a straightforward journey with a singular exit. For a more intricate journey featuring multiple intermediate stops and waiting zones, see the upcoming section (TODO: Link to the advanced journey section).
While we designated a single parameter profile for agents in this example, it’s feasible to define multiple parameter profiles. Learn how to alternate between these profiles in the subsequent section (TODO: Link to the profile-switching section).